<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="utf-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  
  <title>DtCraft - High-performance Cluster Computing Engine</title>
  <base href="../">

  <link rel="icon" href="images/favicon.ico">

  <!-- Bootstrap core CSS -->
  <link rel="stylesheet" href="css/bootstrap.min.css">
  <link rel="stylesheet" href="css/dtcraft.css">
  <script type="text/javascript" src="js/jquery-3.2.1.min.js"></script>
  <script type="text/javascript" src="js/bootstrap.min.js"></script>
  <script type="text/javascript" src="js/run_prettify.js"></script>

</head>


<body>

<div class="container">

  <script src="js/dtcraft-navbar.js?random=<?php echo uniqid(); ?>"></script>

  <div class="content">
  
  <h1>Input/Output Stream</h1>
  <p>In the previous chapter, we covered the basics to create a write event and a read event.
  Most of the time, an application needs to perform <em>buffered</em> data operations in addition to responding to events.
  Read events are normally <em>passive</em> while write events are <em>proactive</em>.
  When we want to read data, the control flow looks like:
    <ol>
      <li>Register a read callback.</li>
      <li>Wait for the device to become readable.</li>
      <li>Read as much as data from the kernel buffer to our data buffer.</li>
    </ol>
  </p>
  <p>
  On the other hand, the control flow to write data has the following pattern:
    <ol>
      <li>Write data to our write buffer.</li>
      <li>Wait for the device to become writable.</li>
      <li>Flush the data from our data buffer to the kernel buffer in the write callback.</li>
    </ol>
  
  Since these two design patterns are very common, DtCraft provides two classes, 
  <code>InputStream</code> and <code>OutputStream</code> for them.
  DtCraft currently suppors only stream-oriented protocols such as pipe and TCP socket.
  We may add support for datagram-oriented protocols like UDP in the future.
  The source code can be found in <code>include/dtc/ipc/ipc.hpp</code> and
  <code>src/ipc/ipc.cpp</code>.
  </p>


  <h3>Input Stream</h3>
  
  <p><code>InputStream</code> is a class derived from <code>ReadEvent</code> and has an input buffer 
  <code>InputStreamBuffer</code> in charge of read-related operations.
  The structure of <code>InputStream</code> is very similar to <code>ReadEvent</code>.
  When the associated device becomes readable, the reactor dispatches it to a worker thread to invoke
  the callback.</p>

  <p>There are many ways to read data from <code>InputStream</code>.
  First, you could get its raw file descriptor and stick with POSIX 
  <a href="http://man7.org/linux/man-pages/man2/read.2.html">read</a>.
  Second, you could use read operations defined in its public member <code>isbuf</code>.
  Third, you could call its thread-safe operator <code>()</code> to deserialize data in binary format.
  We will discuss our built-in deserialization library in later section. </p>
  
  <h5>Example: Create an Input Stream Event</h5>
  <p>The example below creates an input stream event and synchronize its input buffer with the underlying
  device in its callback.</p>
<pre><code class="prettyprint lang-cpp">auto is = reactor.insert&lt;InputStream&gt;(
  std::move(device),
  [] (dtc::InputStream& istream) {
    try {
      istream.isbuf.sync();
    }
    catch(const std::exception& e) {
      std::&lt;&lt; e.what() &lt;&lt; '\n';
      std::exit(EXIT_FAILURE);
    }
  }
);
</code></pre>
  
  <div class="info notes">
  <p><b>Note:</b> The input stream event is level-trigger. The reactor keeps invoking its callback 
  until all data is drained out from the kernel buffer to the user-space buffer.</p>
  </div>
  
  <h3>Output Stream</h3>
  
  <p><code>OutputStream</code> is a class derived from <code>WriteEvent</code> and has a output buffer 
  <code>OutputStreamBuffer</code> in charge of write-related operations.
  Unlike <code>WriteEvent</code> which requires users to explicitly call <code>thaw</code> to 
  bring a write event back for polling,
  <code>OutputStreamBuffer</code> automatically notifies the reactor when the associated buffer has data.
  In other words, when you perform write operations through <code>OutputStream</code>,
  it will unfreeze itself for polling until the buffer becomes empty.</p>
  
  <p>There are many ways to write data through <code>OutputStream</code>.
  First, you could get its raw file descriptor and stick with POSIX 
  <a href="http://man7.org/linux/man-pages/man2/write.2.html">write</a>.
  Second, you could use write operations defined in its public member <code>osbuf</code>.
  Third, you could call its thread-safe operator <code>()</code> to serialize data in binary format.
  We will discuss our built-in serialization library in later section. </p>

  
  <h5>Example: Create a Output Stream Event</h5>
  <p>The example below creates a output stream event and sends a test message.
  The callback synchronizes its output buffer with the underlying device.</p>
<pre><code class="prettyprint lang-cpp">auto os = reactor.insert&lt;OutputStream&gt;(
  std::move(device),
  [] (dtc::OutputStream& ostream) {
    try {
      ostream.osbuf.sync();
    }
    catch(const std::exception& e) {
      std::&lt;&lt; e.what() &lt;&lt; '\n';
      std::exit(EXIT_FAILURE);
    }
  }
);

for(int i=0; i&lt;4; ++i) {
  std::thread t([os](){
    (*os)(std::string{"hello world from thread "} + std::to_string(i));
  });
}
</code></pre>
  
  
  <div class="info notes">
  <p><b>Note:</b> The reactor keeps invoking the callback of a output stream event when 
  both of its output buffer has data pending and the associated device becomes writable,
  until all data are flushed.</p>
  </div>
  
  <h1>Stream Buffer</h1>

  <p>Stream buffer implements a queue of bytes, adding data to the end and removing data from the front.
  It provides a variety of read and write operations, and works seamlessly with our built-in serialization
  and deserialization library.
  DtCraft provides two types of stream buffers, <code>OutputStreamBuffer</code> and 
  <code>InputStreamBuffer</code>. 
  Each stream buffer has a mutex to protect the underlying queue from race condition.
  The source code of stream buffer can be found in <code>include/dtc/ipc/streambuf.hpp</code>
  and <code>src/ipc/streambuf.cpp</code>.
  </p>

  <h3>Input Stream Buffer</h3>
  <p>An input stream buffer contains a linear character array to buffer input operations.
  It can synchronize data with a given device and perform read operations on the buffer. 
  By default, it is safe to access 
  an input stream buffer from multiple threads. 
  The code below shows a list of common methods provided by the input stream buffer.
  </p>  

<pre><code class="prettyprint lang-cpp">class InputStreamBuffer {
  std::streamsize in_avail() const;
  std::streamsize sync();
  std::streamsize read(void* array, std::streamsize n);
  std::streamsize copy(void* array, std::streamsize n) const;
  std::streamsize drop(std::streamsize n);
  std::string_view string_view() const;
  void device(Device* ptr);
  Device* device() const;
};
</code></pre>

  <p>
  These methods are self-explanatory by their names.
  <ul>
  <li><code>in_avail</code> returns the number of available characters in the buffer.</li>
  <li><code>sync</code> performs an one-time synchronization with the associated device,
  returning the number of bytes synchronized or throwing any exception by <code>Device::read</code>.
  If no device is associated with the input stream buffer, <code>sync</code> returns -1.</li>
  </li>
  <li><code>read</code> extracts up to <code>n</code> bytes from the input buffer 
  to <code>array</code>, returning the actual number of bytes extracted.</li>
  <li><code>copy</code> copies up to <code>n</code> bytes from the input buffer 
  to <code>array</code>, returning the actual number of bytes copied.</li>
  <li><code>drop</code> removes up to <code>n</code> bytes from the input buffer by simply advancing the buffer pointer.</li>
  <li><code>string_view</code> returns a constant character description over the input buffer.</li>
  <li><code>device</code> lets you associate a new device to
  or get the present device of an input stream buffer.</li>
  </ul>
  </p>

  <p>The input stream buffer dynamically increases its buffer size during <code>sync</code> to 
  accommondate data from the device.</p>
  
  <h5>Example: Create an Input Stream Buffer</h5>
  <p>The example below creates an input stream buffer and associates it with a device.</p>
<pre><code class="prettyprint lang-cpp">InputStreamBuffer isbuf (device.get());
</code></pre>
  
  <h5>Example: Read all Device Data to an Input Stream Buffer</h5>
  <p>The example below tries to drain all data from a device to an input stream buffer.</p>
<pre><code class="prettyprint lang-cpp">while(1) {
  try {
    if(auto ret = isbuf.sync(); ret == -1) {  // EAGAIN or EWOULDBLOCK
      std::cout &lt;&lt; "Device has no more data to read.\n";
      break;
    }
  }
  catch (const std::exception& e) {  // Error occurs.
    std::cout &lt;&lt; e.what() &lt;&lt; '\n';
  }
}
</code></pre>
  
  <h5>Example: Inspect an Input Stream Buffer</h5>
  <p>The example below demonstrates how to copy and inspect data from an input stream buffer.</p>
<pre><code class="prettyprint lang-cpp">auto view = isbuf.string_view();
std::vector&lt;char&gt; vec(isbuf.in_avail());
auto n = isbuf.copy(vec.data(), vec.size());
assert(memcmp(view.data(), vec.data(), sizeof(char)*n) == 0);
</code></pre>
  
  <div class="info notes">
  <p><b>Note 1:</b> The input stream buffer shares NO ownership of the device. 
  It is user's responsibility to verify the lifetime of the device to prevent synchronization error.</p>
  <p><b>Note 2:</b> The string view of an input stream buffer is a snapshot. 
  Subsequent read operations can invalidate its pointer value.</p>
  </div>

  
  <h3>Output Stream Buffer</h3>
  <p>A output stream buffer contains a linear character array to buffer output operations.
  It defines methods to write data to the buffer and synchronize the buffer with the associated device.
  By default, it is safe to access a output stream buffer
  from multiple threads. The code below shows a list of common methods provided by a output stream buffer.
  </p> 
  
<pre><code class="prettyprint lang-cpp">class OutputStreamBuffer {
  std::streamsize out_avail() const;
  std::streamsize flush();
  std::streamsize sync();
  std::streamsize write(const void* array, std::streamsize n);
  std::streamsize copy(void* array, std::streamsize n) const;
  std::string_view string_view() const;
  void device(Device* ptr);
  Device* device() const;
};
</code></pre>
  
  <p>
  These methods are self-explanatory by their names.
  <ul>
  <li><code>out_avail</code> returns the number of available characters in the buffer.</li>
  <li><code>flush</code> writes all buffered data to the associated device, 
  returning the number of bytes flushed or throwing any exception by <code>Device::write</code>.
  If no device is associated with the output stream buffer, <code>flush</code> returns -1.</li>
  </li>
  <li><code>sync</code> performs an one-time synchronization with the associated device,
  returning the number of bytes synchronized or throwing any exception by <code>Device::write</code>.
  If no device is associated with the output stream buffer, <code>sync</code> returns -1.</li>
  <li><code>write</code> puts up to <code>n</code> bytes from the <code>array</code>
  to the output buffer, returning the actual number of bytes inserted.</li>
  <li><code>copy</code> copies up to <code>n</code> bytes from the output buffer 
  to <code>array</code>, returning the actual number of bytes copied.</li>
  <li><code>string_view</code> returns a constant character description over the output buffer.</li>
  <li><code>device</code> lets you associate a new device to
  or get the present device of a output stream buffer.</li>
  </ul>
  </p>

  <p>The output stream buffer dynamically doubles its size during <code>write</code> to ensure enough space
  accommodate written data.</p>
  
  <h5>Example: Create a Output Stream Buffer</h5>
  <p>The example below creates a output stream buffer and associates it with a device.</p>
<pre><code class="prettyprint lang-cpp">OutputStreamBuffer osbuf (device.get());
</code></pre>
  
  <h5>Example: Flush a Output Stream Buffer</h5>
  <p>The example below writes all data in a output stream buffer via its underlying write function.</p>
<pre><code class="prettyprint lang-cpp">try {
  if(auto ret = osbuf.flush(); ret == -1) {  // EAGAIN or EWOULDBLOCK
    std::cout &lt;&lt; "Device is busy. Try later.\n";
  }
  else assert(osbuf.out_avail() == 0);
}
catch (const std::exception& e) {  // Error occurs.
  std::cout &lt;&lt; e.what() &lt;&lt; '\n';
}
</code></pre>
  
  <h5>Example: Write Data to a Output Stream Buffer</h5>
  <p>The example below writes a character sequence to a output stream buffer.</p>
<pre><code class="prettyprint lang-cpp">std::string data {"a character sequence"};
auto n = osbuf.write(data.c_str(), data.size());
assert(osbuf.out_avail() == n && n == data.size() && osbuf.string_view() == data);
</code></pre>
  
  <div class="info notes">
  <p><b>Note 1:</b> The output stream buffer shares NO ownership of the device. 
  It is user's responsibility to verify the lifetime of the device to prevent synchronization error.</p>
  <p><b>Note 2:</b> The string view of a output stream buffer is a snapshot. 
  Subsequent write operations can invalidate its pointer value.</p>
  </div>


  <h1>Archiver</h1>

  <p>DtCraft has a built-in library, <em>archiver</em>, to serialize data and deserialize data
  from an input or a output stream buffer.
  Our archiver directly works on top of our stream buffer objects, requiring no extra copy in user space.
  It supports most of C++ standard containers and classes, and can be extended to handle your own types.
  The source code can be found in <code>include/dtc/archive/</code>.</p>
  
  <h3>POD and STL Supports</h3>

  <p>By default, our archiver supports the following types:
  <ul>
    <li>Arithmetic POD (bool, char, int, float, double, etc).</li>
    <li>Enumeration type.</li>
    <li>C++ standard sequence containers, <code>array</code>, <code>vector</code>, 
        <code>list</code>, <code>deque</code>, and <code>forward_list</code>.</li>
    <li>C++ standard associative containers, <code>set</code>, <code>multiset</code>, 
        <code>map</code>, and <code>multimap</code>.</li>
    <li>C++ standard unordered associative containers, <code>unordered_set</code> and <code>unordered_map</code>.</li>
    <li>C++ standard chrono classes, <code>duration</code> and <code>time_point</code>.</li>
    <li>C++ standard optional class, <code>optional</code>.</li>
    <li>C++ standard variant class, <code>variant</code>.</li>
    <li>C++ standard tuple class, <code>tuple</code>.</li>
    <li><a href="http://eigen.tuxfamily.org/index.php?title=Main_Page">Eigen</a> matrix type.</li>
  </ul></p>

  
  <h3>Binary Input/Output Archiver</h3>

  <p>We use <code>BinaryOutputArchiver</code> to serialize data and 
  <code>BinaryInputArchiver</code> to deserialize data in binary format.
  Binary archiver is designed to produce compact data representation at bit level
  and is not human readable. It operates on either <code>OutputStreamBuffer</code>
  or <code>InputStreamBuffer</code>. We use operator <code>()</code> to perform
  serialization and deserialization on a parameter pack. 
  The return is a value of type <code>std::streamsize</code> telling how many bytes are transmitted.</p>
  
  <h5>Example: Archive Data in Binary Format</h5>

  <p>The example below shows how to serialize and deserialize data in binary format.</p>

<pre><code class="prettyprint lang-cpp">dtc::OutputStreamBuffer osbuf;
dtc::InputStreamBuffer isbuf;
dtc::BinaryOutputArchiver oar(osbuf);
dtc::BinaryInputArchiver iar(isbuf);

int o1 {1}, i1;
std::string o2 {"test"}, i2;
std::variant&lt;int, double, std::vector&lt;char&gt;&gt; o3 {1.2}, i3;

std::streamsize num_obytes = oar(o1, o2, o3);
std::streamsize num_ibytes = iar(i1, i2, i3);

assert(num_obytes == num_ibytes && o1 == i1 && o2 == i2 && o3 == i3);
</code></pre>

  <h3>Archive a Custom Type</h3>
  <p>To make your own type work with our archiver, add a template method <code>archive</code> in the public
  scope to your class. This lets our archiver know which data members to serialize and deserialize.</p>

  <h5>Example: Add Archive Method to Your Classes</h5>
  <p>The example below shows how to adapt your own type to our archiver interface.</p>

<pre><code class="prettyprint lang-cpp">struct MyType {
  int a;
  double b;
  std::tuple&lt;std::string, char&gt; c;

  template &lt;typename ArchiverT&gt; 
  auto archive(ArchiverT& ar) {
    return ar(a, b, c);
  }
};
</code></pre>
  
  <div class="info notes">
  <p><b>Note:</b> If you do not apply the archiver operator to multiple data at one time, 
  you must return the sum of individual returns.</p>
  </div>
  
  <h1>Where to Go from Here?</h1>
  
  <p>Congratulations! You have studied one core component in DtCraft.
  In the next chapter, we will go over the <em>stream graph</em> and key graph components to get
  your distributed applications up and running.</p>
	

  </div>  <!-- end of content -->
  
  <script src="js/dtcraft-footer.js"></script>

</div>

</body>


</html>


